package com.lyon.dmeo.storage.client.raft;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.RandomUtil;
import com.lyon.demo.protocol.api.core.command.ResponseFuture;
import com.lyon.demo.storage.client.api.core.core.Entry;
import com.lyon.demo.storage.client.api.core.command.request.PushEntryRequest;
import com.lyon.demo.storage.client.api.core.command.request.PushEntryRequest.Type;
import com.lyon.demo.storage.client.api.core.command.request.RequestOrResponse;
import com.lyon.demo.storage.client.api.core.command.response.PushEntryResponse;
import com.lyon.demo.storage.client.api.core.command.response.PushEntryResponse.PushResponseType;
import com.lyon.demo.storage.client.api.core.config.DLedgerConfig;
import com.lyon.demo.storage.client.api.core.core.AbstractStoreService;
import com.lyon.demo.storage.client.api.core.core.MemberState;
import com.lyon.demo.storage.common.SystemClock;
import com.lyon.demo.storage.common.thread.ShutdownAble;
import com.lyon.demo.storage.common.util.TimeUtil;
import com.lyon.dmeo.storage.client.raft.endpoint.RpcRemoteService;
import com.lyon.dmeo.storage.client.raft.core.AppendEntryResponse;
import com.lyon.dmeo.storage.client.raft.core.AppendFuture;
import com.lyon.dmeo.storage.client.raft.core.ShutdownAbleThread;
import com.lyon.dmeo.storage.client.raft.core.exception.CommonException;
import lombok.EqualsAndHashCode;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
 * raft 当前peer 推送服务
 *
 * @author LeeYan9
 * @since 2022-05-27
 */
@SuppressWarnings({"AlibabaClassNamingShouldBeCamel", "rawtypes"})
public class DLedgerEntryPusher implements ShutdownAble {

    private final MemberState memberState;

    private final RpcRemoteService rpcRemoteService;

    private final DLedgerConfig dLedgerConfig;
    private final Logger log = LoggerFactory.getLogger(DLedgerEntryPusher.class);

    private final EntryReceiveHandler entryReceiveHandler;
    private final QuorumAckChecker quorumAckChecker;
    private final Map<String, EntryDispatcher> entryDispatcherMap = new ConcurrentHashMap<>();
    private final AbstractStoreService storeService;

    /**
     * Leader向每个Follower推送成功的条目下标
     * <p>
     * 说明:Leader节点把index下标对应的条目,发送给Follower.
     * 如果Follower成功地保存此条目.
     * 此时Leader节点:修改下标.
     * <p>
     * <p>
     * 备注:如果集群中有3个节点,则内部map中就会有3个key.
     * <p>
     * 假设同一个term下,内部map的值为:
     * peerId    index
     * n1        5     表示:给peerId=n1的Follower,已经成功保存了index = 5的条目.
     * n2        6     表示:给peerId=n2的Follower,已经成功保存了index = 6的条目.
     * n3        7     表示:给peerId=n3的Follower,已经成功保存了index = 7的条目.
     * n4        8     表示:给peerId=n4的Follower,已经成功保存了index = 8的条目.
     * n0        9   (n0为Leader节点,index=9表示Leader保存最新条目到Leader自己的磁盘中.但是并没有超过半数的Follower ACK)
     * Map<term,<perrId,indexForAppended>>
     */
    private final ConcurrentHashMap<Long, Map<String, Long>> peerWaterMarksByTerm = new ConcurrentHashMap<>();
    /**
     * Map<term,<index,Response>>
     */
    private final ConcurrentHashMap<Long, Map<Long, ResponseFuture<AppendEntryResponse>>> pendingAppendResponseByTerm = new ConcurrentHashMap<>();

    private static final long DEFAULT_ACK_PRINT_TIMES = 3000;
    private static final long DEFAULT_TIMES = 1000L;

    public DLedgerEntryPusher(MemberState memberState, DLedgerConfig dLedgerConfig, RpcRemoteService rpcRemoteService,
                              AbstractStoreService storeService) {
        this.memberState = memberState;
        this.dLedgerConfig = dLedgerConfig;
        this.rpcRemoteService = rpcRemoteService;
        this.entryReceiveHandler = new EntryReceiveHandler(log);
        this.quorumAckChecker = new QuorumAckChecker();
        this.storeService = storeService;
        memberState.getPeersWithoutSelf().forEach((peerId, peerAddress) ->
                entryDispatcherMap.put(peerId, new EntryDispatcher(peerId)));
    }

    public void startup() {
        // 启动follower状态时: 请求接收服务
        entryReceiveHandler.start();
        // 启动leader状态时: 日志发送服务
        entryDispatcherMap.values().forEach(EntryDispatcher::start);
        // 启动leader状态时: ack检查服务
        quorumAckChecker.start();
    }

    public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest entryRequest) {
        return entryReceiveHandler.handlePush(entryRequest);
    }

    public ResponseFuture<AppendEntryResponse> waitAck(Entry entry) {
        checkTermForWaterMark(memberState.getCurrTerm());
        updatePeerWaterMark(memberState.getCurrTerm(), memberState.getSelfId(), entry.getIndex());
        ResponseFuture<AppendEntryResponse> future = new AppendFuture<>(RandomUtil.randomLong(32), dLedgerConfig.getMaxAckTimeMs());
        ((AppendFuture) future).setPos(entry.getPos());
        checkTermForPendingMap(memberState.getCurrTerm());
        pendingAppendResponseByTerm.get(memberState.getCurrTerm()).put(entry.getIndex(), future);
        return future;
    }

    @Override
    public void shutdown() {
        entryDispatcherMap.values().forEach(EntryDispatcher::shutdown);
        entryReceiveHandler.shutdown();
        quorumAckChecker.shutdown();
    }

    @EqualsAndHashCode(callSuper = false)
    public class EntryReceiveHandler extends ShutdownAbleThread {

        private final ConcurrentHashMap<Long, Pair<PushEntryRequest, ResponseFuture<PushEntryResponse>>> writeRequestMap;
        private final LinkedBlockingQueue<Pair<PushEntryRequest, ResponseFuture<PushEntryResponse>>> compareOrTruncateRequests;
        private final long lastAppendCheckTimeMs;

        public EntryReceiveHandler(Logger log) {
            super(log);
            this.writeRequestMap = new ConcurrentHashMap<>();
            this.compareOrTruncateRequests = new LinkedBlockingQueue<>();
            lastAppendCheckTimeMs = -1;
        }

        @Override
        protected void doWork() {
            if (!memberState.isFollower()) {
                return;
            }
            // 优先执行compareOrTruncateRequestMap
            if (Objects.nonNull(compareOrTruncateRequests.peek())) {
                Pair<PushEntryRequest, ResponseFuture<PushEntryResponse>> pair = compareOrTruncateRequests.poll();
                PushEntryRequest request = pair.getKey();
                switch (request.getType()) {
                    case COMMIT:
                        handleDoCommit(request.getEntry().getIndex(), request, pair.getValue());
                        break;
                    case COMPARE:
                        handleDoCompare(request.getEntry().getIndex(), request, pair.getValue());
                        break;
                    case TRUNCATE:
                        handleDoTruncate(request.getEntry().getIndex(), request, pair.getValue());
                        break;
                    default:
                        break;
                }
            } else {
                long index = storeService.getEndIndex() + 1;
                Pair<PushEntryRequest, ResponseFuture<PushEntryResponse>> pair = writeRequestMap.get(index);
                if (Objects.isNull(pair)) {
                    checkAppendRequests();
                    return;
                }
                handleAppend(index, pair.getKey(), pair.getValue());
            }
        }

        private void handleAppend(long index, PushEntryRequest request, ResponseFuture<PushEntryResponse> future) {
            try {
                Assert.notNull(request, CommonException.UNKNOWN);
                Assert.isTrue(index == request.getEntry().getIndex(), CommonException.UNKNOWN);
                Assert.notNull(request.getEntry(), CommonException.UNKNOWN);
                // 追加数据
                long pendCommitIndex = storeService.appendAsFollower(request.getEntry());
                Assert.isTrue(pendCommitIndex == index, CommonException.INCONSISTENT_STATE);
                // 返回结果
                future.complete(buildPushResponse(storeService.get(index), PushResponseType.SUCCESS));
                // 提交索引
                storeService.updateCommittedIndex(request.getTerm(), pendCommitIndex);
            } catch (Exception ex) {
                future.complete(buildPushResponse(null, PushResponseType.INCONSISTENT_STATE));
                log.error("", ex);
            }
        }

        private void checkAppendRequests() {
            if (TimeUtil.elapsed(lastAppendCheckTimeMs) < DEFAULT_TIMES) {
                return;
            }
            long endIndex = storeService.getEndIndex();
            for (Map.Entry<Long, Pair<PushEntryRequest, ResponseFuture<PushEntryResponse>>> entry : writeRequestMap.entrySet()) {
                Long index = entry.getKey();
                if (index <= endIndex) {
                    writeRequestMap.remove(index);
                }
                Pair<PushEntryRequest, ResponseFuture<PushEntryResponse>> pair = entry.getValue();
                if (pair.getValue().isTimeout()) {
                    writeRequestMap.remove(index);
                }
            }
        }


        private void handleDoCommit(long index, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) {
            try {
                // 比较索引
                Assert.isTrue(index == request.getIndex(), CommonException.UNKNOWN);
                // 检查存储数据非空
                Entry entry = storeService.get(index);
                Assert.notNull(entry, CommonException.INCONSISTENT_STATE);
                // 比较数据
                Assert.isTrue(entry.equalsTo(request.getEntry()), CommonException.INCONSISTENT_STATE);
                storeService.updateCommittedIndex(request.getTerm(), index);
                future.complete(buildPushResponse(entry, PushResponseType.SUCCESS));
            } catch (Exception ex) {
                future.complete(buildPushResponse(null, PushResponseType.INCONSISTENT_STATE));
                log.error("", ex);
            }
        }

        private void handleDoCompare(long index, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) {
            try {
                // 比较索引
                Assert.isTrue(index == request.getIndex(), CommonException.UNKNOWN);
                // 检查存储数据非空
                Entry entry = storeService.get(index);
                Assert.notNull(entry, CommonException.INCONSISTENT_STATE);
                // 比较数据
                Assert.isTrue(entry.equalsTo(request.getEntry()), CommonException.INCONSISTENT_STATE);
                future.complete(buildPushResponse(entry, PushResponseType.SUCCESS));
            } catch (Exception ex) {
                future.complete(buildPushResponse(null, PushResponseType.INCONSISTENT_STATE));
                log.error("", ex);
            }
        }

        private void handleDoTruncate(long index, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) {
            try {
                // 比较索引
                Assert.isTrue(index == request.getIndex(), CommonException.UNKNOWN);
                Assert.isTrue(Type.TRUNCATE == request.getType(), CommonException.UNKNOWN);
                // 比较数据
                long truncateIndex = storeService.truncate(request.getEntry(), request.getTerm(), request.getLeaderId());
                Assert.isTrue(truncateIndex == index, CommonException.INCONSISTENT_STATE);
                storeService.updateCommittedIndex(request.getTerm(), truncateIndex);
                future.complete(buildPushResponse(request.getEntry(), PushResponseType.SUCCESS));
            } catch (Exception ex) {
                future.complete(buildPushResponse(null, PushResponseType.INCONSISTENT_STATE));
                log.error("", ex);
            }
        }


        @SneakyThrows
        public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) {
            Type type = request.getType();
            ResponseFuture<PushEntryResponse> future = new ResponseFuture<>(-1L, 1000L);
            Entry entry;
            switch (type) {
                case APPEND:
                    entry = request.getEntry();
                    Assert.notNull(entry, CommonException.UNKNOWN);
                    writeRequestMap.put(entry.getIndex(), new Pair<>(request, future));
                    break;
                case COMMIT:
                    compareOrTruncateRequests.add(new Pair<>(request, future));
                    break;
                case COMPARE:
                case TRUNCATE:
                    entry = request.getEntry();
                    Assert.notNull(entry, CommonException.UNKNOWN);
                    compareOrTruncateRequests.add(new Pair<>(request, future));
                    // 需要比较,截断 去掉追加请求
                    writeRequestMap.clear();
                    break;
                default:
                    future.complete(buildPushResponse(null, PushResponseType.UNKNOWN));
                    break;
            }
            wakeup();
            return future;
        }

        public PushEntryResponse buildPushResponse(Entry entry, PushResponseType type) {
            PushEntryResponse response = new PushEntryResponse(type);
            fillMetaInfoForReqOrResponse(response);
            response.setBeginIndex(storeService.getBeginIndex());
            response.setEndIndex(storeService.getEndIndex());
            response.setCommittedIndex(storeService.getCommittedIndex());
            response.setEntry(entry);
            response.setIndex(Objects.isNull(entry) ? -1 : entry.getIndex());
            return response;
        }
    }

    private void fillMetaInfoForReqOrResponse(RequestOrResponse response) {
        response.setTerm(memberState.getCurrTerm());
        response.setGroup(memberState.getGroup());
        response.setLeaderId(memberState.getLeaderId());
        response.setLocalId(memberState.getSelfId());
    }

    @SuppressWarnings("SingleStatementInBlock")
    @EqualsAndHashCode(callSuper = false)
    public class QuorumAckChecker extends ShutdownAbleThread {

        private long lastPrintWaterMarksTimeMs = -1;
        private long lastQuorumIndex = -1;

        public QuorumAckChecker() {
            super(log);
        }

        @Override
        protected void doWork() {
            if (TimeUtil.elapsed(lastPrintWaterMarksTimeMs) > DEFAULT_ACK_PRINT_TIMES) {
                logger.info("quorumAckChecker.. [{}]-[{}] term:[{}] beginIndex:[{}] endIndex:[{}] committedIndex:[{}] JSON:[{}]",
                        memberState.getSelfId(),
                        memberState.getRole(),
                        memberState.getCurrTerm(),
                        storeService.getBeginIndex(),
                        storeService.getEndIndex(),
                        storeService.getCommittedIndex(),
                        peerWaterMarksByTerm);
                lastPrintWaterMarksTimeMs = SystemClock.now();
            }
            if (!memberState.isLeader()) {
                waitForRunning(1);
                return;
            }
            long currTerm = memberState.getCurrTerm();
            checkTermForPendingMap(currTerm);
            checkTermForWaterMark(currTerm);
            checkTermChanged(currTerm);

            doAckCheck(currTerm);
        }

        private void doAckCheck(long currTerm) {
            Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);

            List<Long> sortedPeerWaterMarksIndex = CollUtil.sort(peerWaterMarks.values(), Comparator.naturalOrder());

//              sorted.. indexes
//              n4: 50
//              n3: 70
//              n2: 80
//              n1: 85
//              n0(Leader): 100
//              sortedIndexes.get(sortedIndexes.size()/ 2 )  取中间一个index (大多数已经append完成的标记位置)
            Long quorumIndex = sortedPeerWaterMarksIndex.get(sortedPeerWaterMarksIndex.size() / 2);
            if (quorumIndex <= lastQuorumIndex) {
                return;
            }
            // Leader节点,修改已提交的index. 法定人数ack的索引,在Leader节点才能提交.
            storeService.updateCommittedIndex(currTerm, quorumIndex);
            lastQuorumIndex = quorumIndex;

            boolean needCheck = false;
            int ackNum = 0;
            // 对上次与这次提交的差异段 返回结果
            Map<Long, ResponseFuture<AppendEntryResponse>> responses = pendingAppendResponseByTerm.get(currTerm);
            for (long index = quorumIndex; index >= lastQuorumIndex; index--) {
                // 返回结果
                ResponseFuture<AppendEntryResponse> responseFuture = responses.remove(index);
                if (responseFuture == null) {
                    needCheck = true;
                    break;
                }
                if (!responseFuture.isDone()) {
                    AppendEntryResponse response = buildAppendResponse(index, AppendEntryResponse.Code.SUCCESS);
                    response.setPos(((AppendFuture) responseFuture).getPos());
                    responseFuture.complete(response);
                }
                ackNum++;
            }

            // 检查超时数据
            if (ackNum == 0) {
                checkAppendResponseForTimeout(quorumIndex, responses);
            }

            // 周期检查
            if (TimeUtil.elapsed(SystemClock.now()) > DEFAULT_TIMES || needCheck) {
                updatePeerWaterMark(currTerm, memberState.getSelfId(), quorumIndex);

                for (Map.Entry<Long, ResponseFuture<AppendEntryResponse>> entry : responses.entrySet()) {
                    ResponseFuture<AppendEntryResponse> responseFuture = entry.getValue();
                    Long index = entry.getKey();
                    if (index < quorumIndex) {
                        AppendEntryResponse response = buildAppendResponse(index, AppendEntryResponse.Code.SUCCESS);
                        response.setPos(((AppendFuture) responseFuture).getPos());
                        responseFuture.complete(response);
                        responses.remove(index);
                    }
                }
            }
        }

        private void checkAppendResponseForTimeout(Long quorumIndex, Map<Long, ResponseFuture<AppendEntryResponse>> responses) {
            for (long index = quorumIndex + 1; index < Long.MAX_VALUE; index++) {
                ResponseFuture<AppendEntryResponse> future = responses.get(quorumIndex);
                if (future == null) {
                    break;
                }
                if (future.isTimeout()) {
                    AppendEntryResponse response = buildAppendResponse(index, AppendEntryResponse.Code.TIMI_OUT);
                    future.complete(response);
                    responses.remove(index);
                }
            }
        }

        private AppendEntryResponse buildAppendResponse(long index, AppendEntryResponse.Code code) {
            AppendEntryResponse response = new AppendEntryResponse();
            fillMetaInfoForReqOrResponse(response);
            response.setCode(code);
            response.setIndex(index);
            response.setBeginIndex(storeService.getBeginIndex());
            response.setEndIndex(storeService.getEndIndex());
            response.setCommittedIndex(storeService.getCommittedIndex());
            return response;
        }

        @SuppressWarnings("Convert2streamapi")
        private void checkTermChanged(long currTerm) {
            if (pendingAppendResponseByTerm.size() > 1) {
                for (Long term : pendingAppendResponseByTerm.keySet()) {
                    if (term == currTerm) {
                        continue;
                    }
                    for (Map.Entry<Long, ResponseFuture<AppendEntryResponse>> entry : pendingAppendResponseByTerm.get(term).entrySet()) {
                        ResponseFuture<AppendEntryResponse> responseFuture = entry.getValue();
                        AppendEntryResponse response = buildAppendResponse(entry.getKey(), AppendEntryResponse.Code.TERM_CHANGED);
                        responseFuture.complete(response);
                        logger.info("TERM_CHANGED clear pending response index:[{}] for term form [{}] to [{}] ",
                                entry.getKey(), term, currTerm);
                    }
                    pendingAppendResponseByTerm.remove(term);
                }
            }

            if (peerWaterMarksByTerm.size() > 1) {
                for (Long term : peerWaterMarksByTerm.keySet()) {
                    if (term == currTerm) {
                        continue;
                    }
                    logger.info("TERM_CHANGED clear peerWaterMarksByTerm for oldTerm:{} currTerm:{} ", term, currTerm);
                    peerWaterMarksByTerm.remove(term);
                }
            }
        }
    }


    private void updatePeerWaterMark(long currTerm, String peerId, Long quorumIndex) {
        synchronized (peerWaterMarksByTerm) {
            checkTermForWaterMark(currTerm);
            Long index = peerWaterMarksByTerm.get(currTerm).get(peerId);
            if (index < quorumIndex) {
                peerWaterMarksByTerm.get(currTerm).put(peerId, quorumIndex);
            }
        }
    }

    private void checkTermForWaterMark(long currTerm) {
        if (!peerWaterMarksByTerm.containsKey(currTerm)) {
            ConcurrentHashMap<String, Long> waterMarks = new ConcurrentHashMap<>(16);
            for (String peerId : memberState.getPeerMap().keySet()) {
                waterMarks.put(peerId, -1L);
            }
            peerWaterMarksByTerm.putIfAbsent(currTerm, waterMarks);
        }
    }

    private void checkTermForPendingMap(long currTerm) {
        if (!pendingAppendResponseByTerm.containsKey(currTerm)) {
            pendingAppendResponseByTerm.putIfAbsent(currTerm, new ConcurrentHashMap<>(16));
        }
    }


    @EqualsAndHashCode(callSuper = false)
    public class EntryDispatcher extends ShutdownAbleThread {

        private long term;

        private String leaderId;

        /**
         * 远程服务id
         */
        private final String peerId;

        private long compareIndex = -1;
        private long truncateIndex = -1;
        private long lastPushCommittedTimeMs = -1;

        private final ConcurrentHashMap<Long, Long> pendingMap = new ConcurrentHashMap<>();

        /**
         * 下次写入的偏移标识
         */
        private long prepareWriteOffset = -1;

        private final AtomicReference<Type> type = new AtomicReference<>(Type.COMPARE);

        public EntryDispatcher(String peerId) {
            super(log);
            this.peerId = peerId;
        }

        @Override
        protected void doWork() {
            try {
                if (!checkAndFreshState()) {
                    return;
                }
                if (!memberState.isLeader()) {
                    return;
                }
                switch (type.get()) {
                    case APPEND:
                        doAppend();
                        break;
                    case COMPARE:
                        doCompare();
                        break;
                    default:
                        break;
                }
            } catch (Exception ex) {
                log.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", peerId, getName(), prepareWriteOffset, compareIndex, ex);
                changeState(-1, PushEntryRequest.Type.COMPARE);
                TimeUtil.sleep(500);
            }
        }

        @SneakyThrows
        private void doCompare() {
            if (!checkAndFreshState()) {
                return;
            }
            if (type.get() != Type.COMPARE) {
                return;
            }
            if (compareIndex == -1 && storeService.getEndIndex() == -1) {
                return;
            }
            while (true) {
                if (compareIndex == -1) {
                    compareIndex = storeService.getEndIndex();
                    logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", peerId);
                } else if (compareIndex > storeService.getEndIndex() || compareIndex < storeService.getBeginIndex()) {
                    compareIndex = storeService.getEndIndex();
                    logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, storeService.getBeginIndex(), storeService.getEndIndex());
                }
                Entry entry = storeService.get(compareIndex);
                Assert.notNull(entry, "entry 数据不存在 {}", compareIndex);
                PushEntryRequest pushRequest = buildPushRequest(entry, Type.COMPARE);
                CompletableFuture<PushEntryResponse> future = rpcRemoteService.push(pushRequest);
                PushEntryResponse response = future.get(dLedgerConfig.getReadTimeout(), TimeUnit.MILLISECONDS);
                // 比较index 成功
                doResolveCompareResponse(response);

                if (compareIndex < storeService.getBeginIndex()) {
                    truncateIndex = storeService.getBeginIndex();
                }

                // 需要截断数据
                if (truncateIndex != -1) {
                    changeState(truncateIndex, Type.TRUNCATE);
                    doTruncate(truncateIndex);
                    break;
                }
            }

        }

        private void doResolveCompareResponse(PushEntryResponse response) {
            if (PushResponseType.SUCCESS == response.getType()) {
                // follower 最大索引位 = 当前compareIndex时 直接改为追加模型 ， 如果follower数据少 就慢慢追加
                if (compareIndex == response.getEndIndex()) {
                    changeState(compareIndex, Type.APPEND);
                } else {
                    // follower 最大索引位 != 当前compareIndex时  截断多余不一致的数据
                    truncateIndex = compareIndex;
                }
                // peer 的所有数据都不对
            } else if (response.getBeginIndex() > storeService.getEndIndex() ||
                    response.getBeginIndex() < storeService.getEndIndex()) {
                truncateIndex = storeService.getBeginIndex();
                // peer 的所有数据都不对
            } else if (compareIndex < response.getBeginIndex()) {
                truncateIndex = storeService.getBeginIndex();
                // leader 比 peer 数据更多的情况,找到相同的索引位 去比较数据 并且数据一致 否则 compare-- 直到数据&索引位一致
            } else if (compareIndex > response.getEndIndex()) {
                compareIndex = response.getEndIndex();
            } else {
                // leader 比 peer 数据更少的情况, compareIndex-- 继续执行, 直到leader和peer数据一致
                compareIndex--;
            }
        }

        @SneakyThrows
        private void doTruncate(long truncateIndex) {
            if (type.get() != Type.TRUNCATE) {
                return;
            }
            Entry entry = storeService.get(truncateIndex);
            Assert.notNull(entry, CommonException.UNKNOWN);
            log.debug("发送截断操作 index[{}] data[{}]", truncateIndex, entry);
            PushEntryRequest request = buildPushRequest(entry, Type.TRUNCATE);
            PushEntryResponse response = rpcRemoteService.push(request).get(dLedgerConfig.getReadTimeout(), TimeUnit.MILLISECONDS);
            long endIndex = response.getEndIndex();
            Assert.isTrue(PushResponseType.SUCCESS == response.getType(), CommonException.UNKNOWN);
            Assert.isTrue(endIndex == truncateIndex, "数据截断异常 [{}:{}]", truncateIndex, endIndex);
            changeState(truncateIndex, Type.APPEND);
            lastPushCommittedTimeMs = SystemClock.now();
        }

        private long lastAppendLeakTimeMs = -1;

        @SneakyThrows
        private void doAppend() {
            while (true) {
                if (!checkAndFreshState()) {
                    return;
                }
                if (type.get() != Type.APPEND) {
                    return;
                }
                // 最大推送偏移值 超过 leader 最大appendIndex 或者 需要检查
                if (prepareWriteOffset > storeService.getEndIndex() || TimeUtil.elapsed(lastAppendLeakTimeMs) > 1000) {
                    // 根据Leader的committedIndex 进行Follower提交
                    doCommit();
                    // 检查peer的追加是否出现阻塞/失败 重新推送
                    checkAppendResponse();
                    lastAppendLeakTimeMs = SystemClock.now();
                    return;
                }

                for (Long index : pendingMap.keySet()) {
                    Long peerWaterMarkIndex = getPeerWaterMark(term, peerId);
                    if (index <= peerWaterMarkIndex) {
                        pendingMap.remove(index);
                    }
                }
                doAppendInner(prepareWriteOffset);
                prepareWriteOffset++;
            }
        }

        @SneakyThrows
        private void doAppendInner(long index) {
            Entry entry = checkEntry(index);
            checkQuotaAndWait(entry);
            PushEntryRequest pushEntryRequest = buildPushRequest(entry, Type.APPEND);
            CompletableFuture<PushEntryResponse> future = rpcRemoteService.push(pushEntryRequest);
            pendingMap.put(index, System.currentTimeMillis());
            future.whenComplete((response, ex) -> {
                try {
                    Assert.isNull(ex, "发送错误");
                    Assert.notNull(response, CommonException.UNKNOWN);
                    switch (response.getType()) {
                        case SUCCESS:
                            updatePeerWaterMark(term, peerId, index);
                            quorumAckChecker.wakeup();
                            break;
                        case INCONSISTENT_STATE:
                            changeState(index, Type.COMPARE);
                            logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, response.getIndex(), response.getTerm());
                            break;
                        default:
                            log.error("[Push-{}] get response error , info:[{}]", peerId, response.baseInfo());
                            break;
                    }
                } catch (Exception e) {
                    log.error("doAppendInner ERROR", e);
                }
            });
            lastPushCommittedTimeMs = SystemClock.now();
        }

        private void checkQuotaAndWait(Entry entry) {
            // todo
        }

        private Long getPeerWaterMark(long term, String peerId) {
            Map<String, Long> map = peerWaterMarksByTerm.get(term);
            if (MapUtil.isEmpty(map)) {
                return -1L;
            }
            return map.getOrDefault(peerId, -1L);
        }

        private Entry checkEntry(long index) {
            Entry entry = storeService.get(index);
            Assert.notNull(entry, CommonException.UNKNOWN);
            return entry;
        }

        private void checkAppendResponse() {
            Long peerWaterMarkIndex = getPeerWaterMark(term, peerId);
            // 已经是最新的
            if (peerWaterMarkIndex == storeService.getEndIndex()) {
                return;
            }
            // 已经推送超过一秒,还未更新peerWaterMark 追加标志位 , 重新推送
            Long sendTimeMs = pendingMap.get(peerWaterMarkIndex + 1);
            if (sendTimeMs != null && TimeUtil.elapsed(sendTimeMs) > 1000) {
                doAppendInner(peerWaterMarkIndex + 1);
            }
        }

        private void doCommit() {
            if (TimeUtil.elapsed(lastPushCommittedTimeMs) > 1000) {
                PushEntryRequest request = buildPushRequest(null, Type.COMMIT);
                rpcRemoteService.push(request);
                lastAppendLeakTimeMs = SystemClock.now();
            }
        }

        private PushEntryRequest buildPushRequest(Entry entry, Type type) {
            PushEntryRequest request = new PushEntryRequest();
            fillMetaInfoForReqOrResponse(request);
            request.setEntry(entry);
            request.setRemoteId(peerId);
            request.setType(type);
            request.setBeginIndex(storeService.getBeginIndex());
            request.setEndIndex(storeService.getEndIndex());
            request.setCommittedIndex(storeService.getCommittedIndex());
            return request;
        }

        private boolean checkAndFreshState() {
            if (!memberState.isLeader()) {
                return false;
            }
            //修改term
            if (Objects.isNull(leaderId) || !leaderId.equals(memberState.getLeaderId()) || term != memberState.getCurrTerm()) {
                synchronized (memberState) {
                    if (!memberState.isLeader()) {
                        return false;
                    }
                    // todo
                    Assert.isTrue(memberState.getSelfId().equals(memberState.getLeaderId()), "leaderId<>selfId");
                    this.term = memberState.getCurrTerm();
                    this.leaderId = memberState.getSelfId();
                    changeState(-1, Type.COMPARE);
                }
            }
            return true;
        }


        private void changeState(long index, Type expectedType) {
            log.info("entryPusher of type state change from [{}] to [{}] index[{}]", type.get(), expectedType, index);
            switch (expectedType) {
                case APPEND:
                    // 重置比较索引
                    compareIndex = -1;
                    // 设置当前Leader对 当前peer 的追加位置
                    prepareWriteOffset = index + 1;
                    // 更新当前 peer的追加 偏移量
                    updatePeerWaterMark(term, peerId, index);
                    // 通知进行提交 committedOffset
                    quorumAckChecker.wakeup();
                    break;
                case COMPARE:
                    // 追加数据落后太多时，重新比较
                    if (type.compareAndSet(Type.APPEND, expectedType)) {
                        compareIndex = -1;
                        pendingMap.clear();
                    }
                    compareIndex = -1;
                    break;
                case TRUNCATE:
                    compareIndex = -1;
                    break;
                default:
                    break;
            }
            this.type.set(expectedType);
        }
    }

}
